package com.godenwater.recv.service;

import com.godenwater.core.spring.Application;
import com.godenwater.core.spring.BaseDao;
import com.godenwater.recv.server.yf.YFMessageBodyAsc;
import com.godenwater.recv.server.yf.YFMessageBodyFlow;
import com.godenwater.recv.server.yf.YfParser;
import com.godenwater.recv.server.yf.YfParserFlow;
import com.godenwater.yanyu.YYBuilder;
import com.godenwater.web.manager.StationManager;
import com.godenwater.web.rtu.model.RtuStationModel;

import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.math.BigDecimal;
import java.text.SimpleDateFormat;
import java.util.*;

/**
 * 1.1【湖北一方】报文消费者
 *
 * @author lipujun
 * @ClassName: MessageConsumer
 * @Description: 通过线程的方式将“报文”解析，然后入库，这是消息进行处理的第一个环节
 * <p>
 * 5 分钟大于0 的数据需要入库，类型23
 * 一小时等于0 的数据也需要入，类型22
 * <p>
 * 1、初始化工作，测站初始化，最新数据初始化
 * <p>
 * 注意：只启动一个消费者
 * @date Mar 14, 2013
 */
public class MessageYfConsumer extends AbstractMessageConsumer implements
        Runnable {

    private static Logger logger = LoggerFactory
            .getLogger(MessageYfConsumer.class);

    SimpleDateFormat sdf22 = new SimpleDateFormat("yyMMddHH");
    SimpleDateFormat sdf21 = new SimpleDateFormat("yyMMddHHmm");
    SimpleDateFormat sdf24 = new SimpleDateFormat("yyMMddHHmmss");

    private static long DAY = 24 * 60 * 60 * 1000;

    private static long HOUR = 60 * 60 * 1000;

    private static String KEY = "messages";

    private BaseDao dao = (BaseDao) Application.getInstance()
            .getBean("baseDao");

    public MessageYfConsumer() {

    }

    @Override
    public void run() {
        int i = 0;

    }

    /**
     * 处理业务逻辑
     * <p>
     * 第一步：原始报文入库
     * <p>
     * 第二步：判断单报、多报。单报写“入库队列”；多报写文件，并根据最后一条报文写“合并队列”
     *
     * @param msgstr
     */
    public void process(String channel, String msgstr) {
        logger.info(">>【湖北一方】原始报文開始入庫！");
        logger.info(">>【湖北一方】" + msgstr);
        try {
            // 解析报文体
            String[] msg = null;
            //以#符号确认雨量水位解析，流速报文数据解析，流速报文数据中存有#号
            if (null != msgstr && !"".equals(msgstr) && !msgstr.contains("#M")) {
                msg = msgstr.split(" ");
            } else {//流速报文以换行符分隔
                msg = msgstr.replaceAll("\r", "").split(";");
            }
            logger.info(">>报文详情：" + msg[0]);
            for (int m = 0; m < msg.length - 1; m++) {
                String stcds = msg[m].substring(1, 5);
                RtuStationModel station = StationManager.getInstance().getStation(stcds);
                if (station != null) {
                    //一方流速站sttp等于16，反之为水位雨量站协议处理
                    if (!msg[m].contains("#M")) {
                        logger.info(">>雨量水位报文详情：" + msg[0]);
                        YFMessageBodyAsc message = YfParser.parseAsc(msg[m]);
                        //实时数据
                        String id = UUID.randomUUID().toString();
                        String stcd = message.getStcd();
                        String sttp = message.getSttp();
                        String viewdate = message.getViewdate();
                        this.writeRtuMessage(id, stcd, "YF", channel, message.getFunccode(), 1, sdf22.parse(viewdate), 1, 1, msgstr, true, "", "", 0, 0);
                        //获取国家站号
                        stcd = station.getStcd8();
                        //需根据类型来获取数据
                        ////01 02 12 03 13	雨量 并行水位 并行水文 串行水位 串行水文
                        if (message.getFunccode().equals("22")) {
                            //得到相对于报文中时间的航一个整点
                            Date vd = sdf22.parse(viewdate);
                            String[] rains = message.getRain();
                            //20180606韩曦修改，一方雨量数据以倒序排列，故将雨量数组反转
                            for (int i = 0; i < rains.length / 2; i++) {
                                String temp1 = rains[i];
                                rains[i] = rains[rains.length - i - 1];
                                rains[rains.length - i - 1] = temp1;
                            }
                            String[] rivers = message.getRiver();
                            for (int i = 0; i < rivers.length / 2; i++) {
                                String temp2 = rivers[i];
                                rivers[i] = rivers[rivers.length - i - 1];
                                rivers[rivers.length - i - 1] = temp2;
                            }
                            DateTime mdt = new DateTime(vd);
                            Calendar cal = Calendar.getInstance();
                            cal.setTime(mdt.toDate());
                            cal.add(Calendar.HOUR_OF_DAY, -1);
                            mdt = new DateTime(cal.getTime());
                            //得到上一时段整点时间
                            mdt = mdt.minusMinutes(mdt.getMinuteOfHour());
                            String rdata = "";
                            double totolRainData = 0.0;//累计雨量
                            double tmp5minRain = -1.0;//时段雨量
                            for (int i = 0; i < 12; i++) {
                                mdt = mdt.plusMinutes(5);
                                if (sttp.equals("01") || sttp.equals("12") || sttp.equals("13")) {//只入雨量数据
                                    Calendar cal1 = Calendar.getInstance();
                                    cal1.setTime(mdt.toDate());
                                    cal1.add(Calendar.HOUR_OF_DAY, -1);
                                    Date lastHour = cal1.getTime();
                                    if (!rains[i].contains("?")) {
                                        //1、取当前的雨量数据
                                        int rainCount = Integer.parseInt(new String(rains[i]));
                                        BigDecimal bd = new BigDecimal(rainCount);
                                        bd = bd.multiply(new BigDecimal(0.5));
                                        totolRainData = bd.doubleValue();
                                        //2、获取原先的雨量数据，进行相减

                                        //循环第一次，上一时段5分钟临时累计雨量取前5分钟雨量累计值
                                        if (i == 0) {
                                            DateTime tmpData = mdt;
                                            tmpData = tmpData.plusMinutes(-5);
                                            tmp5minRain = getRealRain(stcd, tmpData.toDate(), 23);
                                        }

                                        //写入累计雨量到临时库，5分钟雨量
                                        writeYFConnvertData(stcd, mdt.toDate(), "1", 23, totolRainData, 999);

                                        logger.info(">>Date:" + mdt.toString() + ",tmp5minRain:" + tmp5minRain + ",totolRainData" + totolRainData);
                                        if (tmp5minRain != -1.0) {
                                            //如果上一个点未缺报，写入雨量到临时库，5分钟雨量
                                            writeYFConnvertData(stcd, mdt.toDate(), "1", 23, totolRainData - tmp5minRain, 41);
                                        }
                                        tmp5minRain = totolRainData;
                                        //写入整点雨量数据
                                        if (cal1.get(Calendar.MINUTE) == 0) {
                                            Double lastHourRain = getRealRain(stcd, lastHour, 22);
                                            //写入累计雨量到临时库，时段雨量
                                            writeYFConnvertData(stcd, mdt.toDate(), "1", 22, totolRainData, 999);
                                            //写入雨量到临时库，时段雨量
                                            logger.info(">>Date:" + mdt.toString() + ",lastHourRain:" + lastHourRain + ",totolRainData" + totolRainData);
                                            if (lastHourRain != -1.0) {
                                                writeYFConnvertData(stcd, mdt.toDate(), "1", 22, totolRainData - lastHourRain, 41);
                                            }
                                        }
                                    }
        						/*else if( cal1.get(Calendar.MINUTE)==0 ){
        							Double lastHourRain= getRealRain(stcd, lastHour,22);
    								//写入累计雨量到临时库，时段雨量
    								writeYFConnvertData(stcd, mdt.toDate(), "1", 22, totolRainData, 999);
    								//写入雨量到临时库，时段雨量
    								logger.info(">>Date:"+mdt.toString()+",lastHourRain:"+lastHourRain +",totolRainData"+totolRainData);
    								if(lastHourRain!=-1.0) {
    									writeYFConnvertData(stcd, mdt.toDate(), "1", 22, totolRainData-lastHourRain, 41);
    								}
        						}*/
                                }

                                if (sttp.equals("02") || sttp.equals("12")) {//并行水位
                                    if (!rivers[i].contains("?")) {
                                        rdata = YYBuilder.buildDot(rivers[i], 2);
                                        writeYFConnvertData(stcd, mdt.toDate(), "1", 11, addRiverDtmVal(station, rdata), 41);
                                    }
//        						else{
//        							//若水位值包含？则不入库，故注释此行
//       							writeYFConnvertData(stcd, mdt.toDate(), "1", 11,addRiverDtmVal(station,rdata), 41);
//        						}
                                }

                                if (sttp.equals("03")) {//并行水文
                                    Calendar cal1 = Calendar.getInstance();
                                    cal1.setTime(mdt.toDate());
                                    cal1.add(Calendar.HOUR_OF_DAY, -1);
                                    Date lastHour = cal1.getTime();
                                    if (!rains[i].contains("?") && !rivers[i].contains("?")) {
                                        //1、取当前的雨量数据
                                        int rainCount = Integer.parseInt(new String(rains[i]));
                                        BigDecimal bd = new BigDecimal(rainCount);
                                        bd = bd.multiply(new BigDecimal(0.5));
                                        totolRainData = bd.doubleValue();
                                        //获取水位数据，将报文数据差分为带2位小数点数据。
                                        rdata = YYBuilder.buildDot(rivers[i], 2);
                                        //2、获取原先的雨量数据，进行相减
                                        //循环第一次，上一时段5分钟临时累计雨量取前5分钟雨量累计值
                                        if (i == 0) {
                                            DateTime tmpData = mdt;
                                            tmpData = tmpData.plusMinutes(-5);
                                            tmp5minRain = getRealRain(stcd, tmpData.toDate(), 23);
                                        }

                                        //写入累计雨量到临时库，5分钟雨量
                                        writeYFConnvertData(stcd, mdt.toDate(), "1", 23, totolRainData, 999);
                                        //写入雨量到临时库，5分钟雨量
                                        logger.info(">>Date:" + mdt.toString() + ",tmp5minRain:" + tmp5minRain + ",totolRainData" + totolRainData);
                                        if (tmp5minRain != -1.0) {
                                            writeYFConnvertData(stcd, mdt.toDate(), "1", 23, totolRainData - tmp5minRain, 41);
                                        }
                                        tmp5minRain = totolRainData;

                                        //写入整点雨量数据
                                        if (cal1.get(Calendar.MINUTE) == 0) {
                                            Double lastHourRain = getRealRain(stcd, lastHour, 22);
                                            //写入累计雨量到临时库，时段雨量
                                            writeYFConnvertData(stcd, mdt.toDate(), "1", 22, totolRainData, 999);
                                            //写入雨量到临时库，时段雨量
                                            logger.info(">>Date:" + mdt.toString() + ",lastHourRain:" + lastHourRain + ",totolRainData" + totolRainData);
                                            if (lastHourRain != -1.0) {
                                                writeYFConnvertData(stcd, mdt.toDate(), "1", 22, totolRainData - lastHourRain, 41);
                                            }
                                        }

                                        writeYFConnvertData(stcd, mdt.toDate(), "1", 11, addRiverDtmVal(station, rdata), 41);
                                    }
//        						else if( cal1.get(Calendar.MINUTE)==0 ){
//        							Double lastHourRain= getRealRain(stcd, lastHour,22);
//    								//写入累计雨量到临时库，时段雨量
//    								writeYFConnvertData(stcd, mdt.toDate(), "1", 22, totolRainData, 999);
//    								//写入雨量到临时库，时段雨量
//    								logger.info(">>Date:"+mdt.toString()+",lastHourRain:"+lastHourRain +",totolRainData"+totolRainData);
//    								if(lastHourRain!=-1.0) {
//    									writeYFConnvertData(stcd, mdt.toDate(), "1", 22, totolRainData-lastHourRain, 41);
//    								}
//    								writeYFConnvertData(stcd, mdt.toDate(), "1", 11,addRiverDtmVal(station,rdata), 41);
//        						}
//        						else{
//        							//若水位值包含？则不入库，故注释此行
//        							writeYFConnvertData(stcd, mdt.toDate(), "1", 11,addRiverDtmVal(station,rdata), 41);
//        						}
                                }
                            }
                        }

                        //加报报数据
                        if (message.getFunccode().equals("21")) {
                            Date vd = sdf21.parse(viewdate);
                            String[] rains = message.getRain();
                            String[] rivers = message.getRiver();

                            if (sttp.equals("01") || sttp.equals("12") || sttp.equals("13")) {//只入雨量数据
                                if (!rains[0].contains("?")) {
                                    writeYFConnvertData(stcd, vd, "1", 1, Double.parseDouble(rains[0]), 41);
                                }
                            }

                            if (sttp.equals("02") || sttp.equals("03") || sttp.equals("12")) {//并行水位
                                if (!rivers[0].contains("?")) {
                                    String rdata = YYBuilder.buildDot(rivers[0], 2);
                                    writeYFConnvertData(stcd, vd, "1", 11, addRiverDtmVal(station, rdata), 41);
                                }
                            }
                        }
                    } else {//流速协议解
                        logger.info(">>流速处理入库详情：" + msg[m]);
                        try {
                            YFMessageBodyFlow message = YfParserFlow.parseAsc(msg[m]);

                            String id = UUID.randomUUID().toString();
                            String stcd = message.getStcd();
                            String viewdate = message.getViewdate();
                            this.writeRtuMessage(id, stcd, "YF", channel, message.getFunccode(), 1, sdf24.parse(viewdate), 1, 1, msgstr, true, "", "", 0, 0);
                            stcd = station.getStcd8();
                            Date vd = sdf24.parse(viewdate);
                            writeYFConnvertData(stcd, vd, "1", 19, Double.parseDouble(message.getFlow()), 41);
                        } catch (Exception e) {
                            logger.info(">>流速解析报错：" + e);
                        }
                    }
                } else {
                    //测站不存在
                    writeRtuAlarm(stcds, "", "", new Date(), "YF 测站不存在！");
                    logger.info(stcds + " 测站不存在 YF 协议！");
                }
            }

        } catch (Exception e) {
            logger.info(">>报错：" + e.getMessage());
            e.printStackTrace();
        }
    }

    /**
     * 加上水位基值…
     *
     * @param station
     * @param river
     * @return
     */
    public double addRiverDtmVal(RtuStationModel station, String river) {
        //加上水位基值，不要使用double计算
        double dtm = station.getDtmel() == null ? 0 : station.getDtmel().doubleValue();
        BigDecimal rval = new BigDecimal(river);
        rval = rval.add(new BigDecimal(dtm));
        return rval.doubleValue();
    }

    /**
     * 将原始报文消息写入数据库
     */
    public void writeRtuMessage(String stcd, String f26) {
        // 插入原始报文

        Map params = new HashMap();

        String sql = "update rtu_message_new set f26 = ? where stcd = ?";
        params.put("f26", f26);
        params.put("stcd", stcd);

        try {
            dao.getJdbcTemplate().update(sql, new Object[]{f26, stcd});
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


    public static void main(String[] arg) throws Exception {

        byte[] aaa = new byte[2];
        MessageYfConsumer consumer = new MessageYfConsumer();

        String message = "$40261G22030402170315011263002131270400213127040021312704002130270400213027040021302704002130270400213027040021302704002130270400213027040021302704";
        message = "$40261G2103170315015500213227041262";
        consumer.process("", message);

    }

}
